{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": [],
      "private_outputs": true,
      "toc_visible": true,
      "include_colab_link": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "view-in-github",
        "colab_type": "text"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/devanshmodi/beam/blob/devanshmodi-patch-healthcare-hl7-to-hcapi/examples/notebooks/healthcare/beam_post_hl7_messages_to_hcapi.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "zQ_JXPR3RoFV"
      },
      "outputs": [],
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License\n",
        "\n",
        "##################################\n",
        "# Author: Devansh Modi           #\n",
        "##################################\n"
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "**Highlevel Architecture**\n",
        "\n",
        "![Screenshot 2023-10-18 at 3.53.31 PM.png]()"
      ],
      "metadata": {
        "id": "RL1LDp645ogr"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# **Post Hl7v2 messages to Google Cloud Healthcare API HL7v2 store pipeline**\n",
        "\n",
        "This example demonstrates how to set up an Apache Beam pipeline that reads a HL7 file from [Google Cloud Storage](https://https://cloud.google.com/storage), and calls the [Google Cloud Healthcare API Hl7v2 store to store Hl7 messages](https://cloud.google.com/healthcare-api/docs/how-tos/hl7v2-messages) to extract information from unstructured data. This application can be used in contexts such as reading raw Hl7 messages, if needed parse them or modify them as per your defined Hl7v2 store configurations and store data into Hl7v2 store.\n",
        "\n",
        "An Apache Beam pipeline is a pipeline that reads input data, transforms that data, and writes output data. It consists of PTransforms and PCollections. A PCollection represents a distributed data set that your Beam pipeline operates on. A PTransform represents a data processing operation, or a step, in your pipeline. It takes one or more PCollections as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.\n",
        "\n",
        "For details about Apache Beam pipelines, including PTransforms and PCollections, visit the [Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/).\n",
        "\n",
        "You'll be able to use this notebook to explore the data in each PCollection."
      ],
      "metadata": {
        "id": "wC9KRrlORwKu"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "**What is an HL7v2 message?**\n",
        "\n",
        "HL7 Messages are used to transfer electronic data between disparate healthcare systems, each sending information about a particular event such as a patient admission.\n",
        "\n",
        "An HL7 message consists of one or more segments. Each segment is displayed on a different line of text. A carriage return character (\\r, which is 0D in hexadecimal) separates one segment from another.\n",
        "\n",
        "Each segment consists of one or more composites, also known as fields. A pipe (|) character is used to separate one composite from another. If a composite contains other composites, these sub-composites (or sub-fields) are normally separated by caret (^) characters.\n",
        "\n"
      ],
      "metadata": {
        "id": "AOVYgtyaqSxa"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "***Sample HL7v2 Message***\n",
        "\n",
        "The below reference message shows a sample Hl7v2 messages seperated by \\r.\n",
        "\n",
        "**MSH|^~\\&|FROM_APP|FROM_FACILITY|TO_APP|TO_FACILITY|20150503223000||ADT^A01|20150503223000|P|2.5|\\r\n",
        "EVN|A01|20110613083617|\\r\n",
        "PID|1||21004053^^^^MRN||SULLY^BRIAN||19611209|M|||123 MAIN ST^^MOUNTAIN SPRINGS^CO^80439|\\r\n",
        "PV1||I|H73 RM1^1^^HIGHWAY 73 CLINIC||||5148^MARY QUINN|||||||||Y||||||||||||||||||||||||||||20150503223000|**\n",
        "\n",
        "The file contains many such messages and the objective of this code will be to split and construct messages and POST it to Google Cloud HealthCare API HL7v2 store."
      ],
      "metadata": {
        "id": "-lpbvwHmX1L5"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "Lets install necessary packages"
      ],
      "metadata": {
        "id": "81wCK9XnS6Sc"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "!pip install apache-beam[gcp]"
      ],
      "metadata": {
        "id": "Yv1phmRZS23c"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "**Google Cloud Authentication**\n",
        "\n",
        "As we are using Google Clous Storage and HealthCare API, we will be requiring tokens to make sure our connection is secure.\n",
        "\n",
        "Click [this](https://cloud.google.com/free) link to create a new Google Cloud Platform account\n"
      ],
      "metadata": {
        "id": "3EcdPBczYQlB"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "**GCP Setup**\n",
        "1. Authenticate your notebook by `gcloud auth application-default login` in the Colab terminal.\n",
        "\n",
        "2. Run `gcloud config set project <YOUR-PROJECT>`\n",
        "\n",
        "Set the variables in the next cell based upon your project and preferences.\n",
        "\n",
        "Note that below, **us-central1** is hardcoded as the location. This is because of the limited number of [locations](https://cloud.google.com/healthcare-api/docs/how-tos/hl7v2-messages) the API currently supports."
      ],
      "metadata": {
        "id": "tpePe_yOsdSJ"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "Before running please set the following variables as arguments as mentioned below\n"
      ],
      "metadata": {
        "id": "_1Q3mw1usnoE"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "args = {'gcp_project':'xxx', #GCP project ID\n",
        "        'gcp_region':'xxx', # GCP project region\n",
        "        'temp_location':'gs://<YOUR Bucket>/tmp', #input location where your HL7 messages are stored in GCS bucket\n",
        "        'input_file':'gs://<YOUR Bucket>/my_message.hl7', #input location where your HL7 messages are stored in GCS bucket\n",
        "        'hcapi_project_id':'xxxxxx', #healthcare API project ID\n",
        "        'hcapi_dataset':'xxxx', #healthcare dataset\n",
        "        'hcapi_version':'v1', #healthcare API version by defualt v1\n",
        "        'hcapi_location':'xxxx', #healthcare API configured location\n",
        "        'hcapi_hl7_store':'xxx', #healthcare api hl7 store\n",
        "        'hcapi_fhir_store':''}"
      ],
      "metadata": {
        "id": "a722GbqdvgOX"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "**Google Cloud Healthcare (HCAPI) API Utils class**\n",
        "\n",
        "Below is the code snippet which describes the class having healthcare API connections and configurations. Basic functionality includes constructing the hcapi_url as per the input parameters, cleaning the HL7 message in a proper format and posting hl7v2 message to hl7v2 store. You can add more transformations as per your requirements."
      ],
      "metadata": {
        "id": "NHzk8JIqxQoa"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "import google.auth\n",
        "import google.auth.transport.requests\n",
        "import base64\n",
        "import json\n",
        "import hashlib\n",
        "import requests\n",
        "import logging\n",
        "import apache_beam as beam\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "from apache_beam.options.pipeline_options import SetupOptions\n",
        "from apache_beam.testing.test_pipeline import TestPipeline\n",
        "import apache_beam.runners.interactive.interactive_beam as ib\n",
        "from apache_beam import io\n",
        "\n",
        "logging.basicConfig(level=logging.INFO, format='%(asctime)s :: %(levelname)s :: %(message)s')\n",
        "\n",
        "class hcapi_cls:\n",
        "\n",
        "    def __init__(self, args):\n",
        "        self.hcapi_hl7_store = str(args['hcapi_hl7_store'])\n",
        "        self.hcapi_project_id = str(args['hcapi_project_id'])\n",
        "        self.hcapi_version = str(args['hcapi_version'])\n",
        "        self.hcapi_location = str(args['hcapi_location'])\n",
        "        self.hcapi_dataset = str(args['hcapi_dataset'])\n",
        "        self.hcapi_fhir_store = str(args['hcapi_fhir_store'])\n",
        "        self.token = None\n",
        "\n",
        "    def google_api_headers(self):\n",
        "        \"\"\" Function gets the token for the request \"\"\"\n",
        "        logging.info(\"fetching token and refreshing credentials\")\n",
        "        creds, project = google.auth.default()\n",
        "        auth_req = google.auth.transport.requests.Request()\n",
        "        creds.refresh(auth_req)\n",
        "        return {\n",
        "            \"Authorization\": f\"Bearer {creds.token}\",\n",
        "            \"Prefer\": \"handling=strict\"\n",
        "        }\n",
        "\n",
        "    def hcapi_dataset_url(self, version=None, project=None, location=None, dataset=None):\n",
        "        \"\"\" This function creates base hcapi dataset url and returns it \"\"\"\n",
        "        base = 'https://healthcare.googleapis.com'\n",
        "        version = self.hcapi_version\n",
        "        project = self.hcapi_project_id\n",
        "        location = self.hcapi_location\n",
        "        dataset = self.hcapi_dataset\n",
        "        return f'{base}/{version}/projects/{project}/locations/{location}/datasets/{dataset}'\n",
        "\n",
        "    def hcapi_get(self, url):\n",
        "        \"\"\" Function to send get request to HCAPI \"\"\"\n",
        "        response = requests.get(url, headers=self.google_api_headers())\n",
        "        if not response.ok:\n",
        "            raise Exception(f'Error with HC API get:\\n{response.text}')\n",
        "        return response.json()\n",
        "\n",
        "    def hcapi_post(self, url, data):\n",
        "        \"\"\" Function to send post request to HCAPI \"\"\"\n",
        "        response = requests.post(url, headers=self.google_api_headers(), json=data)\n",
        "        if not response.ok:\n",
        "            raise Exception(f'Error with HC API post:\\n{response.text}')\n",
        "        return response.json()\n",
        "\n",
        "    def hcapi_delete(self, url):\n",
        "        \"\"\" Function to send delete request to HCAPI \"\"\"\n",
        "        response = requests.delete(url, headers=self.google_api_headers())\n",
        "        if not response.ok:\n",
        "            raise Exception(f'Error with HC API get:\\n{response.text}')\n",
        "        return response.json()\n",
        "\n",
        "    def hcapi_hl7_url(self, version=None, project=None, location=None, dataset=None, store=None):\n",
        "        \"\"\" This function creates hcapi hl7V2store url and returns the url \"\"\"\n",
        "        base_url = self.hcapi_dataset_url(version=version, project=project,\n",
        "                                      location=location, dataset=dataset)\n",
        "        hl7_store = self.hcapi_hl7_store\n",
        "        return f'{base_url}/hl7V2Stores/{hl7_store}'\n",
        "\n",
        "    def get_hl7_message(self, message_id):\n",
        "        \"\"\" Function to get message from HL7v2 store using HCAPI URL \"\"\"\n",
        "        url = f'{self.hcapi_hl7_url()}/messages/{message_id}'\n",
        "        return self.hcapi_get(url)\n",
        "\n",
        "    def post_hl7_message(self, payload):\n",
        "        \"\"\" Function to post messages to HL7v2 store \"\"\"\n",
        "        url = f'{self.hcapi_hl7_url()}/messages'\n",
        "        return self.hcapi_post(url, payload)\n",
        "\n",
        "    def message_to_hl7_store(self, message):\n",
        "        \"\"\" Function to clean up Hl7 messages with \\r seperator before posting to HCAPI \"\"\"\n",
        "        messase =str(message)\n",
        "        message = message.replace('\\n', '\\r')\n",
        "        message = message.replace('\\\\r', '\\r')\n",
        "        message = message.replace('\\r\\r', '\\r')\n",
        "        encoded = base64.b64encode(str(message).encode())\n",
        "        payload = {\n",
        "            \"message\": {\n",
        "                \"data\": encoded.decode()\n",
        "            }\n",
        "        }\n",
        "        return self.post_hl7_message(payload)\n",
        "\n",
        "    def hcapi_fhir_url(self, version=None, project=None, location=None, dataset=None, store=None):\n",
        "        \"\"\" This function creates hcapi fhir store url and returns it \"\"\"\n",
        "        base_url = self.hcapi_dataset_url(version=version, project=project,\n",
        "                                      location=location, dataset=dataset)\n",
        "        if store is None:\n",
        "            raise Exception('No FHIR store specified')\n",
        "        return f'{base_url}/fhirStores/{store}/fhir'\n",
        "\n",
        "    def hcapi_fhir_request(self, store_key, query, data={}, method='GET'):\n",
        "        \"\"\" Function to send post request to HCAPI FHIR store \"\"\"\n",
        "        store = self.hcapi_fhir_store\n",
        "        if not store:\n",
        "            raise Exception(f\"Couldn't FHIR find store named {store_key} in config\")\n",
        "        url = self.hcapi_fhir_url(store=store)\n",
        "        url = f'{url}/{query}' if query else url\n",
        "        get = lambda q, d: self.hcapi_get(url)\n",
        "        post = lambda q, d: self.hcapi_post(url, data)\n",
        "        delete = lambda q, d: self.hcapi_delete(url)\n",
        "        return {'GET': get, 'POST': post, 'DELETE' : delete}[method](query, data)\n",
        "\n"
      ],
      "metadata": {
        "id": "H7g4_-rGS9P_"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "**Pipeline Setup**\n",
        "\n",
        "We will use InteractiveRunner in this notebook.\n",
        "Following are the DoFn classes which carry out their respective operations"
      ],
      "metadata": {
        "id": "lXnzAtbHyUd2"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "The following class **BuildFileName** takes the file name from the element and converts its into string. You can enhance this class to construct GCS bucket URL, if your GCS bucket prefix remains constant."
      ],
      "metadata": {
        "id": "TKnL8kxh3Kms"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "class BuildFileName(beam.DoFn):\n",
        "    \"\"\" Class to get file name from variable and returns the filename \"\"\"\n",
        "    def process(self, element):\n",
        "        logging.info(\"processing the following file: {}\".format(element))\n",
        "        file_path = str(element)\n",
        "        yield file_path"
      ],
      "metadata": {
        "id": "N01E3dQd3Jr3"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "The following class **BuildMessages** takes the GCS URL from the above class reads it, separates out each message, appends them into a list and return the list for the next class."
      ],
      "metadata": {
        "id": "Jej68R8w3i2Z"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "class BuildMessages(beam.DoFn):\n",
        "    \"\"\" Class to read file, clean and seperate messgaes based on MSH\"\"\"\n",
        "    def process(self, file_name):\n",
        "        try:\n",
        "            logging.info(\"starting to read file: {}\".format(file_name))\n",
        "            file = io.gcsio.GcsIO().open(filename=file_name, mode='r')\n",
        "            read_file = file.read()\n",
        "            new_file = str(read_file, encoding='utf-8').replace('\\n', '\\r')\n",
        "            logging.info(\"starting to seperate HL7 messages into list\")\n",
        "            messages=[]\n",
        "            for line in new_file.split('\\r'):\n",
        "                if line[:3] =='MSH':\n",
        "                    messages.append(line)\n",
        "                else:\n",
        "                    messages[-1]+= line\n",
        "\n",
        "\n",
        "            logging.info(\"total number of messages parsed are {}\".format(len(messages)))\n",
        "            return messages\n",
        "        except Exception as error:\n",
        "            logging.error(\"got the following error while processing : {}\".format('\\n'+str(error)))\n",
        "            raise Exception\n",
        "\n",
        "\n"
      ],
      "metadata": {
        "id": "MC6tr_sGyNKG"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "The following class **PostToHL7V2Store** takes the messages return in the earlier class and POST each messages to Hl7v2 store ."
      ],
      "metadata": {
        "id": "1hpuoUGA33jo"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "class PostToHL7V2Store(beam.DoFn):\n",
        "    \"\"\" Class to read file, clean and seperate messgaes based on MSH\"\"\"\n",
        "    def process(self, element):\n",
        "        try:\n",
        "            logging.info(\"starting to prepare and post message\")\n",
        "            hl7v2_store_response = hcapi.message_to_hl7_store(element)\n",
        "            message_id = hl7v2_store_response['name'].split(\"/\")[-1]\n",
        "            logging.info(\"successfully posted message to Hl7v2 store with message id :- {}\".format(message_id))\n",
        "\n",
        "            yield message_id\n",
        "        except Exception as error:\n",
        "            logging.error(\"got the following error while processing : {}\".format(error))\n",
        "            raise Exception"
      ],
      "metadata": {
        "id": "lVjqYfb2330k"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "The following function sets up a beam pipeline with various other Pipeline options that will extracts messages from Hl7 text and post each hl7 message to hl7v2 store using Google Cloud Healthcare API (HCAPI) api methods.\n",
        "\n",
        "**\"|\"** is an overloaded operator that applies a PTransform to a PCollection to produce a new PCollection. Together with |, >> allows you to optionally name a PTransform.\n",
        "\n",
        "Usage:[PCollection] | [PTransform], **or** [PCollection] | [name] >> [PTransform]"
      ],
      "metadata": {
        "id": "g5oJgXCk4O1a"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "\n",
        "import apache_beam.runners.interactive.interactive_beam as ib\n",
        "def run(beam_args,argv=None,save_main_session=True):\n",
        "    runnertype = \"InteractiveRunner\"\n",
        "    project=beam_args['gcp_project']\n",
        "    region=beam_args['gcp_region']\n",
        "    temp_location=beam_args['temp_location']\n",
        "\n",
        "    options = PipelineOptions(\n",
        "      flags=argv,\n",
        "      runner=runnertype,\n",
        "      project=project,\n",
        "      job_name=\"my-beam-hl7to-hcapi\",\n",
        "      temp_location=temp_location,\n",
        "      region=region)\n",
        "    beam_pipeline_options = PipelineOptions(beam_args)\n",
        "    beam_pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n",
        "    with beam.Pipeline(options=beam_pipeline_options) as pipeline:\n",
        "        file = (\n",
        "            pipeline\n",
        "            | 'reading filename' >> beam.Create([args_dict['input_file']])\n",
        "            | 'preparing file path' >> beam.ParDo(BuildFileName())\n",
        "        )\n",
        "        hl7_messages=(\n",
        "            file\n",
        "            | 'parsing hl7 messages' >> beam.ParDo(BuildMessages())\n",
        "        )\n",
        "        post_hl7_messages = (\n",
        "            hl7_messages\n",
        "            | \"posting to hl7v2 Store\" >> beam.ParDo(PostToHL7V2Store())\n",
        "        )\n",
        "\n",
        "\n",
        "    ib.show_graph(pipeline)\n",
        "\n",
        "\n",
        "if __name__ == \"__main__\":\n",
        "    logging.getLogger().setLevel(logging.INFO)\n",
        "    args_dict = dict(args)\n",
        "    hcapi= hcapi_cls(args_dict)\n",
        "    run(beam_args=args_dict)"
      ],
      "metadata": {
        "id": "Dynn2PDuyRBT"
      },
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "![Screenshot 2023-09-29 at 4.42.51 PM.png]()"
      ],
      "metadata": {
        "id": "tweQCiuX5RVK"
      }
    }
  ]
}